package com.cscec8b.ct.consumer.coproessor;

import com.cscec8b.ct.common.constant.Names;
import com.cscec8b.ct.common.dao.BaseDao;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * @Copyright: Shanghai Definesys Company.All rights reserved.
 * @Description: 协处理器添加被叫用户数据
 * @author: chuhaitao
 * @since: 2019/2/10 22:19
 * @history: 1.2019/2/10 created by chuhaitao
 * <p>
 * <p>
 * 1、继承BaseRegionObserver
 * 2、实现postPut方法，执行业务逻辑
 * 3、让表关联到协处理器（创建表的时候关联）
 * 4、打成jar包放入到HBase中，分发到hbase集群中
 */
public class CalleeProessor extends BaseRegionObserver {

    /**
     * 由Hbase 自动添加被叫用户
     *
     * @param e          当前的上下文环境
     * @param put
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
            throws IOException {
        Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));

        String rowKey = Bytes.toString(put.getRow());

        System.out.println("------rowKey----------" + rowKey);

        String[] values = rowKey.split("_");
        //  5_19683537146_20180421043144_15781588029_1987_1
        //获取rowKey

        String call1 = values[1];
        System.out.println("------call1----------" + call1);
        String call2 = values[3];
        System.out.println("------call2----------" + call2);
        String callTime = values[2];
        System.out.println("------callTime----------" + callTime);
        String duration = values[4];
        System.out.println("------duration----------" + duration);
        String flag = values[5];
        System.out.println("------flag----------" + flag);

        CalleeProessorDao calleeProessorDao = new CalleeProessorDao();
        int pre = calleeProessorDao.getCalleeRegionNum(call2, callTime);
        //生成rowKey

        String calleeRowKey = pre + "_" + call2 + "_" + callTime + "_" + call1 + "_" + duration + "_" + "0";
        System.out.println("------calleeRowKey----------" + calleeRowKey);
        if ("1".equals(flag)) {
            Put calleePut = new Put(Bytes.toBytes(calleeRowKey));
            //获取列族
            byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());
            calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("callTime"), Bytes.toBytes(callTime));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("flag"), Bytes.toBytes("0"));

            table.put(put);

            table.close();
        }
    }


    class CalleeProessorDao extends BaseDao {

        protected int getCalleeRegionNum(String tel, String date) {
            return getRegionNum(tel, date);
        }
    }
}
